package org.ultrapower.checkuser.core.hbase;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
import org.apache.hadoop.hbase.util.Bytes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.Closeable;
import java.io.IOException;
import java.io.Serializable;
import java.util.*;

/**
 * hbase操作API
 */
public class HBaseHelper implements Serializable, Closeable
{

    private static final long serialVersionUID = 1L;
    protected static final Logger LOGGER = LoggerFactory.getLogger(HBaseHelper.class);
    private static final int DEFAULT_MAX_VERSIONS = 1;
    private Configuration config;
    private Admin admin;
    private Connection connection;

    public HBaseHelper()
    {
        config = HBaseConfiguration.create();
    }

    public HBaseHelper(Configuration config)
    {
        this.config = config;
        System.out.println(config.get("hbase.client.pause"));
    }

    /**
     * @param tableName
     * @return
     * @throws IOException
     * @author
     */
    public Table getTable(String tableName) throws IOException
    {
        return getConnection().getTable(TableName.valueOf(tableName));
    }

    /**
     * @return
     * @throws IOException
     * @author
     */
    public Admin getHBaseAdmin() throws IOException
    {
        if (admin == null)
        {
            admin = getConnection().getAdmin();
        }
        return admin;
    }

    /**
     * @param table
     * @author
     */
    public void doCloseTable(Table table)
    {
        if (table == null)
        {
            return;
        }
        try
        {
            table.close();
        } catch (IOException e)
        {
            e.printStackTrace();
        }
    }

    /**
     * @param tableName
     * @param families
     * @author
     */
    public void createTable(String tableName, String[] families)
    {
        createTable(tableName, DEFAULT_MAX_VERSIONS, null, families);
    }

    /**
     * @param tableName
     * @param splitKeys
     * @param families
     * @author
     */
    public void createTable(String tableName, byte[][] splitKeys,
                            String[] families)
    {
        createTable(tableName, DEFAULT_MAX_VERSIONS, splitKeys, families);
    }

    /**
     * @param tableName
     * @param maxVersions
     * @param families
     * @author
     */
    public void createTable(String tableName, int maxVersions, String[] families)
    {
        createTable(tableName, maxVersions, null, families);
    }

    /**
     * @param tableName
     * @param family
     * @author
     */
    public void createTable(String tableName, int maxVersions,
                            byte[][] splitKeys, String[] families)
    {
        if (StringUtils.isBlank(tableName) || families == null
                || families.length <= 0)
        {
            return;
        }
        try
        {
            if (!getHBaseAdmin().tableExists(TableName.valueOf(tableName)))
            {
                HTableDescriptor desc = new HTableDescriptor(
                        TableName.valueOf(tableName));
                for (String family : families)
                {
                    HColumnDescriptor columnDescriptor = new HColumnDescriptor(
                            family);
                    columnDescriptor.setCompressionType(Algorithm.SNAPPY);
                    columnDescriptor.setMaxVersions(maxVersions);
                    desc.addFamily(columnDescriptor);
                }
                if (splitKeys != null)
                {
                    getHBaseAdmin().createTable(desc, splitKeys);
                } else
                {
                    getHBaseAdmin().createTable(desc);
                }
            } else
            {
                LOGGER.warn("Table " + tableName + " already exists.");
            }
        } catch (IOException e)
        {
            LOGGER.error("", e);
        }
    }

    /**
     * @param tableName
     * @author
     */
    public void dropTable(String tableName)
    {
        Admin admin = null;
        try
        {
            admin = getHBaseAdmin();
            if (admin.tableExists(TableName.valueOf(tableName)))
            {
                admin.disableTable(TableName.valueOf(tableName));
                admin.deleteTable(TableName.valueOf(tableName));
            }
        } catch (IOException e)
        {
            LOGGER.error("drop table error." + e);
        } finally
        {
            if (null != admin)
            {
                try
                {
                    admin.close();
                } catch (IOException e)
                {
                    LOGGER.error("close admin error " + e);
                }
            }
        }
    }

    /**
     * @param tableName
     * @param rowkey
     * @return
     * @author
     */
    public byte[] get(String tableName, String rowkey, String family,
                      String qualifier) throws IOException
    {
        Table table = null;

        Get get = new Get(Bytes.toBytes(rowkey));
        get.addColumn(Bytes.toBytes(family), Bytes.toBytes(qualifier));
        table = getTable(tableName);
        if (getHBaseAdmin().tableExists(TableName.valueOf(tableName)))
        {
            Result result = table.get(get);
            return result.getValue(Bytes.toBytes(family),
                    Bytes.toBytes(qualifier));
        } else
        {
            LOGGER.warn("Table " + tableName + " does not exist.");
        }

        return null;
    }

    /**
     * @param tableName
     * @param rowkey
     * @return
     * @author zhanglei11
     */
    public String getString(String tableName, String rowkey, String family,
                            String qualifier) throws IOException
    {
        return Bytes.toString(get(tableName, rowkey, family, qualifier));
    }

    /**
     * @param tableName
     * @param rowkey
     * @return
     * @author
     */
    public Map<String, byte[]> getMapByKeyAndFamily(String tableName,
                                                    String rowkey, String family)
    {
        Map<String, byte[]> map = new HashMap<String, byte[]>();
        Table table = null;
        try
        {
            Get get = new Get(Bytes.toBytes(rowkey));
            get.addFamily(Bytes.toBytes(family));
            table = getTable(tableName);
            if (getHBaseAdmin().tableExists(TableName.valueOf(tableName)))
            {
                Result result = table.get(get);
                for (Cell cell : result.rawCells())
                {
                    byte[] q = CellUtil.cloneQualifier(cell);
                    byte[] v = CellUtil.cloneValue(cell);
                    map.put(Bytes.toString(q), v);
                }
            } else
            {
                LOGGER.warn("Table " + tableName + " does not exist.");
            }
        } catch (IOException e)
        {
            e.printStackTrace();
        }
        return map;
    }

    /**
     * @param tableName
     * @param rowkey
     * @return
     * @author
     */
    public Map<String, byte[]> getRowMap(String tableName, String rowkey)
    {
        Map<String, byte[]> map = new HashMap<String, byte[]>();
        Table table = null;
        try
        {
            Get get = new Get(Bytes.toBytes(rowkey));
            table = getTable(tableName);
            if (getHBaseAdmin().tableExists(TableName.valueOf(tableName)))
            {
                Result result = table.get(get);
                for (Cell cell : result.rawCells())
                {
                    byte[] q = CellUtil.cloneQualifier(cell);
                    byte[] v = CellUtil.cloneValue(cell);
                    map.put(Bytes.toString(q), v);
                }
            } else
            {
                LOGGER.warn("Table " + tableName + " does not exist.");
            }
        } catch (IOException e)
        {
            e.printStackTrace();
        }
        return map;
    }

    /**
     * @param tableName
     * @param rowkeys
     * @return
     */
    public Result[] getRecodes(String tableName, List<String> rowkeys)
    {
        Table table = null;
        if (rowkeys == null || rowkeys.size() == 0)
        {
            LOGGER.warn("Has no rowkeys to get.");
            return null;
        }
        try
        {
            List<Get> gets = new ArrayList<Get>();
            Get get = null;
            for (String rowkey : rowkeys)
            {
                get = new Get(Bytes.toBytes(rowkey));
                gets.add(get);
            }
            table = getTable(tableName);
            if (getHBaseAdmin().tableExists(TableName.valueOf(tableName)))
            {
                Result[] results = table.get(gets);
                return results;
            } else
            {
                LOGGER.warn("Table " + tableName + " does not exist.");
                return null;
            }
        } catch (IOException e)
        {
            LOGGER.error("get table [{}] recodes error", tableName, e);
            return null;
        }
    }

    /**
     * @param tableName
     * @param rowkey
     * @return
     */
    public Result getRecode(String tableName, String rowkey)
    {
        Table table = null;
        try
        {
            Get get = new Get(Bytes.toBytes(rowkey));
            table = getTable(tableName);
            if (getHBaseAdmin().tableExists(TableName.valueOf(tableName)))
            {
                Result result = table.get(get);
                return result;
            } else
            {
                LOGGER.warn("Table " + tableName + " does not exist.");
                return null;
            }
        } catch (IOException e)
        {
            LOGGER.error("get table [{}] recodes error", tableName, e);
            return null;
        }
    }

    /**
     * @param tableName
     * @param gets
     * @return
     */
    public Result[] getRecodesByGets(String tableName, List<Get> gets)
    {
        Table table = null;
        if (gets == null || gets.size() == 0)
        {
            LOGGER.warn("Has no gets to get.");
            return null;
        }
        try
        {
            table = getTable(tableName);
            if (getHBaseAdmin().tableExists(TableName.valueOf(tableName)))
            {
                Result[] results = table.get(gets);
                return results;
            } else
            {
                LOGGER.warn("Table " + tableName + " does not exist.");
                return null;
            }
        } catch (IOException e)
        {
            LOGGER.error("get table [{}] recodes error", tableName, e);
            return null;
        }
    }

    /**
     * @param tableName
     * @param get
     * @return
     */
    public Result getRecodeByGet(String tableName, Get get)
    {
        Table table = null;
        try
        {
            table = getTable(tableName);
            if (getHBaseAdmin().tableExists(TableName.valueOf(tableName)))
            {
                Result result = table.get(get);
                return result;
            } else
            {
                LOGGER.warn("Table " + tableName + " does not exist.");
                return null;
            }
        } catch (IOException e)
        {
            LOGGER.error("get table [{}] recodes error", tableName, e);
            return null;
        }
    }

    /**
     * @param tableName
     * @param start
     * @param end
     * @return Iterator<Result>
     */
    public Iterator<Result> getRecodeByRange(String tableName, String start, String end)
    {
        Table table = null;

        try
        {
            table = getTable(tableName);
            Scan scan = new Scan();
            scan.setMaxVersions(DEFAULT_MAX_VERSIONS);
            scan.setBatch(1000);
            scan.setStartRow(Bytes.toBytes(start));
            scan.setStopRow(Bytes.toBytes(end));

            //Filter filter = new PageFilter(2);
            //scan.setFilter(filter);
            ResultScanner scanner = table.getScanner(scan);
            Iterator<Result> res = scanner.iterator();
            return res;
        } catch (IOException e)
        {
            LOGGER.error("get table [{}] recodes error", tableName, e);
            return null;
        }
    }

    /**
     * @throws IOException
     * @throws Exception
     */
    public List<Map<String, String>> findByRangeAndColumn(String tableName, String start, String end, String family, String[] columns) throws IOException
    {
        List<Map<String, String>> list = new ArrayList<Map<String, String>>();
        Table table = null;
        System.out.println("start:" + start);
        System.out.println("end:" + end);
        try
        {
            table = getTable(tableName);
            Scan scan = new Scan();
            scan.setMaxVersions(DEFAULT_MAX_VERSIONS);
            scan.setBatch(1000);
            scan.setStartRow(Bytes.toBytes(start));
            scan.setStopRow(Bytes.toBytes(end));
            ResultScanner scanner = table.getScanner(scan);
            Iterator<Result> res = scanner.iterator();
            while (res.hasNext())
            {
                Result result = res.next();
                Map<String, String> map = new HashMap<String, String>();
                map.put("rowKay", new String(result.getRow()));
                for (String c : columns)
                {
                    map.put(c, new String(result.getValue(family.getBytes(), c.getBytes())));
                }
                list.add(map);
            }
            return list;
        } catch (Exception e)
        {
            e.printStackTrace();
        } finally
        {
            doCloseTable(table);
            close();
        }
        return null;
    }

    /**
     * @return
     * @author
     */
    public boolean isHBaseAvailable()
    {
        try
        {
            HBaseAdmin.checkHBaseAvailable(config);
        } catch (ZooKeeperConnectionException zkce)
        {
            LOGGER.error("", zkce);
            return false;
        } catch (MasterNotRunningException e)
        {
            LOGGER.error("", e);
            return false;
        } catch (Exception e)
        {
            LOGGER.error(
                    "Check HBase available throws an Exception. We don't know whether HBase is running or not.",
                    e);
            return false;
        }
        return true;
    }

    public Connection getConnection()
    {
        if (connection == null)
        {
            try
            {
                connection = ConnectionFactory.createConnection(config);
            } catch (IOException e)
            {
                LOGGER.error("Create HBase connect error.", e);
            }
        }
        return connection;
    }

    public void close() throws IOException
    {
        if (admin != null)
        {
            admin.close();
            admin = null;
        }
        if (connection != null)
        {
            connection.close();
            connection = null;
        }
    }
}
